#!/bin/bash

set -eu

cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME
source $cur/../_utils/shardddl_lib.sh

function DM_076_CASE() {
    run_sql_source1 "alter table ${shardddl1}.${tb1} add primary key(id);"
    run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
            "query-status test" \
            "Unsupported add primary key" 1
}

function DM_076() {
    run_case 076 "single-source-pessimistic" "init_table 111" "clean_table" ""
}

function DM_077_CASE() {
    run_sql_source1 "alter table ${shardddl1}.${tb1} drop primary key;"
    run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
            "query-status test" \
            "Unsupported drop primary key when alter-primary-key is false" 1
}

function DM_077() {
    run_case 077 "single-source-pessimistic" "run_sql_source1 \"create table ${shardddl1}.${tb1} (id int primary key);\"" "clean_table" ""
}

function DM_080_CASE() {
    run_sql_source1 "alter table ${shardddl1}.${tb1} add unique key idx_a(a);"
    run_sql_source1 "alter table ${shardddl1}.${tb1} add unique key idx_b(b);"
    run_sql_source1 "alter table ${shardddl1}.${tb1} add unique key idx_ab(a,b);"
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,'aaa');"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(1,'aaa');"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add unique key idx_a(a);"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add unique key idx_b(b);"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add unique key idx_ab(a,b);"
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(2,'bbb');"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,'bbb');"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(2,'bbb');"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add unique key idx_a(a);"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add unique key idx_b(b);"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add unique key idx_ab(a,b);"
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(3,'ccc');"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(3,'ccc');"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(3,'ccc');"
    run_sql_tidb_with_retry "select count(1) from ${shardddl}.${tb};" "count(1): 3"
}

function DM_080() {
    run_case 080 "double-source-pessimistic" \
    "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
     run_sql_source2 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
     run_sql_source2 \"create table ${shardddl1}.${tb2} (a int, b varchar(10));\"" \
    "clean_table" "pessimistic"

    # currently not support optimistic
    #run_case 080 "double-source-optimistic" \
    #"run_sql_source1 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
    # run_sql_source2 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
    # run_sql_source2 \"create table ${shardddl1}.${tb2} (a int, b varchar(10));\"" \
    #"clean_table" "optimistic"
}

function DM_081_CASE() {
    run_sql_source1 "alter table ${shardddl1}.${tb1} add unique key idx_a(a);"
    run_sql_source1 "alter table ${shardddl1}.${tb1} add unique key idx_b(b);"
    run_sql_source1 "alter table ${shardddl1}.${tb1} add unique key idx_ab(a,b);"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add unique key idx_a(a);"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add unique key idx_b(b);"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add unique key idx_ab(a,b);"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add unique key idx_a(a);"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add unique key idx_b(b);"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add unique key idx_ab(a,b);"

    run_sql_source1 "alter table ${shardddl1}.${tb1} drop index idx_a;"
    run_sql_source1 "alter table ${shardddl1}.${tb1} drop index idx_b;"
    run_sql_source1 "alter table ${shardddl1}.${tb1} drop index idx_ab;"
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,'aaa');"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(1,'aaa');"
    run_sql_source2 "alter table ${shardddl1}.${tb1} drop index idx_a;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} drop index idx_b;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} drop index idx_ab;"
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(2,'bbb');"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,'bbb');"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(2,'bbb');"
    run_sql_source2 "alter table ${shardddl1}.${tb2} drop index idx_a;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} drop index idx_b;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} drop index idx_ab;"
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(3,'ccc');"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(3,'ccc');"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(3,'ccc');"
    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

function DM_081() {
    run_case 081 "double-source-pessimistic" \
    "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
     run_sql_source2 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
     run_sql_source2 \"create table ${shardddl1}.${tb2} (a int, b varchar(10));\"" \
    "clean_table" "pessimistic"

    # currently not support optimistic
    # run_case 081 "double-source-optimistic" \
    #"run_sql_source1 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
    # run_sql_source2 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
    # run_sql_source2 \"create table ${shardddl1}.${tb2} (a int, b varchar(10));\"" \
    #"clean_table" "optimistic"
}


function DM_082_CASE() {
    run_sql_source1 "alter table ${shardddl1}.${tb1} rename index a to c;"
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,'aaa');"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,'bbb');"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(3,'ccc');"
    run_sql_source2 "alter table ${shardddl1}.${tb1} rename index a to c;"
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(4,'ddd');"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(5,'eee');"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(6,'fff');"
    run_sql_source2 "alter table ${shardddl1}.${tb2} rename index a to c;"
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(7,'ggg');"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(8,'hhh');"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(9,'iii');"
    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

function DM_082() {
    run_case 082 "double-source-pessimistic" \
    "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int unique key, b varchar(10));\"; \
     run_sql_source2 \"create table ${shardddl1}.${tb1} (a int unique key, b varchar(10));\"; \
     run_sql_source2 \"create table ${shardddl1}.${tb2} (a int unique key, b varchar(10));\"" \
    "clean_table" "pessimistic"

    # currently not support optimistic
    # run_case 082 "double-source-optimistic" \
    #"run_sql_source1 \"create table ${shardddl1}.${tb1} (a int unique key, b varchar(10));\"; \
    # run_sql_source2 \"create table ${shardddl1}.${tb1} (a int unique key, b varchar(10));\"; \
    # run_sql_source2 \"create table ${shardddl1}.${tb2} (a int unique key, b varchar(10));\"" \
    #"clean_table" "optimistic"
}

function DM_094_CASE() {
    # here we run ddl to make sure we flush first check point in syncer
    # otherwise the worker may dump again when restart
    run_sql_source1 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source1 "alter table ${shardddl1}.${tb2} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add column col int;"
    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

    run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,1);"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,2);"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(3,3);"
    run_sql_source1 "insert into ${shardddl1}.${tb2} values(4,4);"

    run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col1 int;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column new_col1 int;"

    run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
        "stop-task test" \
        "\"result\": true" 3
    
    run_sql_source1 "alter table ${shardddl1}.${tb2} add column new_col1 int;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add column new_col1 int;"

    run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
        "start-task $cur/conf/double-source-$1.yaml" \
        "\"result\": true" 3

    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

function DM_094() {
    run_case 094 "double-source-pessimistic" "init_table 111 112 211 212" "clean_table" "pessimistic"
    run_case 094 "double-source-optimistic" "init_table 111 112 211 212" "clean_table" "optimistic"
}

function DM_095_CASE() {
    # here we run ddl to make sure we flush first check point in syncer
    # otherwise the worker may dump again when restart
    run_sql_source1 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source1 "alter table ${shardddl1}.${tb2} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add column col int;"
    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
 
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,1);"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,2);"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(3,3);"
    run_sql_source1 "insert into ${shardddl1}.${tb2} values(4,4);"

    run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col1 int;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column new_col1 int;"

    # do not check pause result because we may pause when auto resume in optimistic
    # then pause-task "result": true may not 3
    run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
        "pause-task test" \
    
    run_sql_source1 "alter table ${shardddl1}.${tb2} add column new_col1 int;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add column new_col1 int;"

    run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
        "resume-task test" \
        "\"result\": true" 3
    
    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

function DM_095() {
    run_case 095 "double-source-pessimistic" "init_table 111 112 211 212" "clean_table" "pessimistic"
    run_case 095 "double-source-optimistic" "init_table 111 112 211 212" "clean_table" "optimistic"
}

function DM_096_CASE() {
    # here we run ddl to make sure we flush first check point in syncer
    # otherwise the worker may dump again when restart
    run_sql_source1 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source1 "alter table ${shardddl1}.${tb2} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add column col int;"
    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
 
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,1);"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,2);"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(3,3);"
    run_sql_source1 "insert into ${shardddl1}.${tb2} values(4,4);"

    run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col1 int;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column new_col1 int;"

    run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
        "pause-task test" \
    
    run_sql_source1 "alter table ${shardddl1}.${tb2} add column new_col1 int;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add column new_col1 int;"

    # TODO: uncomment after we support update-task
    # run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
    #     "update-task $cur/conf/double-source-$1.yaml" \
    #     "\"result\": true" 1

    run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
        "resume-task test" \
        "\"result\": true" 3
    
    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

function DM_096() {
    run_case 096 "double-source-pessimistic" "init_table 111 112 211 212" "clean_table" "pessimistic"
    run_case 096 "double-source-optimistic" "init_table 111 112 211 212" "clean_table" "optimistic"
}

function DM_097_CASE() {
    # here we run ddl to make sure we flush first check point in syncer
    # otherwise the worker may dump again when restart
    run_sql_source1 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source1 "alter table ${shardddl1}.${tb2} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add column col int;"
    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
 
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,1);"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,2);"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(3,3);"
    run_sql_source1 "insert into ${shardddl1}.${tb2} values(4,4);"

    run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col1 int;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column new_col1 int;"

    ps aux | grep dm-master |awk '{print $2}'|xargs kill || true
    check_port_offline $MASTER_PORT1 20
    
    run_sql_source1 "alter table ${shardddl1}.${tb2} add column new_col1 int;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add column new_col1 int;"

    run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
    check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT

    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

function DM_097() {
    run_case 097 "double-source-pessimistic" "init_table 111 112 211 212" "clean_table" "pessimistic"
    run_case 097 "double-source-optimistic" "init_table 111 112 211 212" "clean_table" "optimistic"
}

function DM_098_CASE() {
    # here we run ddl to make sure we flush first check point in syncer
    # otherwise the worker may dump again when restart
    run_sql_source1 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source1 "alter table ${shardddl1}.${tb2} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add column col int;"
    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
 
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,1);"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,2);"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(3,3);"
    run_sql_source1 "insert into ${shardddl1}.${tb2} values(4,4);"

    run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col1 int;"

    ps aux | grep dm-worker1 |awk '{print $2}'|xargs kill || true
    check_port_offline $WORKER1_PORT 20

    run_sql_source2 "alter table ${shardddl1}.${tb1} add column new_col1 int;"
    run_sql_source1 "alter table ${shardddl1}.${tb2} add column new_col1 int;"

    run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
    check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

    run_sql_source2 "alter table ${shardddl1}.${tb2} add column new_col1 int;"

    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

function DM_098() {
    run_case 098 "double-source-pessimistic" "init_table 111 112 211 212" "clean_table" "pessimistic"
}

function DM_099_CASE() {
    # here we run ddl to make sure we flush first check point in syncer
    # otherwise the worker may dump again when restart
    run_sql_source1 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source1 "alter table ${shardddl1}.${tb2} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add column col int;"
    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
 
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,1);"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,2);"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(3,3);"
    run_sql_source1 "insert into ${shardddl1}.${tb2} values(4,4);"

    run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col1 int;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column new_col1 int;"

    ps aux | grep dm-worker2 |awk '{print $2}'|xargs kill || true
    check_port_offline $WORKER2_PORT 20

    run_sql_source1 "alter table ${shardddl1}.${tb2} add column new_col1 int;"

    run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
    check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

    run_sql_source2 "alter table ${shardddl1}.${tb2} add column new_col1 int;"

    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

function DM_099() {
    run_case 099 "double-source-pessimistic" "init_table 111 112 211 212" "clean_table" "pessimistic"
}

function DM_100_CASE() {
    # here we run ddl to make sure we flush first check point in syncer
    # otherwise the worker may dump again when restart
    run_sql_source1 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source1 "alter table ${shardddl1}.${tb2} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add column col int;"
    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
 
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,1);"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,2);"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(3,3);"
    run_sql_source1 "insert into ${shardddl1}.${tb2} values(4,4);"

    run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col1 int;"

    ps aux | grep dm-worker1 |awk '{print $2}'|xargs kill || true
    check_port_offline $WORKER1_PORT 20

    run_sql_source2 "alter table ${shardddl1}.${tb1} add column new_col1 int;"
    run_sql_source1 "alter table ${shardddl1}.${tb2} add column new_col1 int;"

    run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
    check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

    run_sql_source2 "alter table ${shardddl1}.${tb2} add column new_col1 int;"

    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

function DM_100() {
    run_case 100 "double-source-optimistic" "init_table 111 112 211 212" "clean_table" "optimistic"
}

function DM_101_CASE() {
    # here we run ddl to make sure we flush first check point in syncer
    # otherwise the worker may dump again when restart
    run_sql_source1 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source1 "alter table ${shardddl1}.${tb2} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column col int;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add column col int;"
    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
 
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,1);"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,2);"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(3,3);"
    run_sql_source1 "insert into ${shardddl1}.${tb2} values(4,4);"

    run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col1 int;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column new_col1 int;"

    ps aux | grep dm-worker2 |awk '{print $2}'|xargs kill || true
    check_port_offline $WORKER2_PORT 20

    run_sql_source1 "alter table ${shardddl1}.${tb2} add column new_col1 int;"

    run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
    check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

    run_sql_source2 "alter table ${shardddl1}.${tb2} add column new_col1 int;"

    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

function DM_101() {
    run_case 101 "double-source-optimistic" "init_table 111 112 211 212" "clean_table" "optimistic"
}

function DM_102_CASE() {
    run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col1 int default 0;"
    run_sql_source1 "insert into ${shardddl1}.${tb1} values (1,1);"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column new_col1 int default -1;"

    run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
        "show-ddl-locks" \
        "\"ID\": \"test-\`shardddl\`.\`tb\`\"" 1

    run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
        "unlock-ddl-lock test-\`shardddl\`.\`tb\`" \
        "\"result\": true" 1

    run_sql_source2 "insert into ${shardddl1}.${tb1} values (2,2);"

    run_sql_tidb_with_retry "select count(1) from ${shardddl}.${tb};" "count(1): 2"
}

function DM_102() {
    run_case 102 "double-source-pessimistic" "init_table 111 211" "clean_table" ""
}

function DM_103_CASE() {
    run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,'aaa');"
    run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,'bbb');"
    run_sql_source2 "insert into ${shardddl1}.${tb2} values(3,'ccc');"

    run_sql_source1 "alter table ${shardddl1}.${tb1} add column c double;"
    run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} add column c double;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} add column c double;"
    run_sql_source1 "alter table ${shardddl1}.${tb1} change a a bigint default 10;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} drop column b;"
    run_sql_source2 "alter table ${shardddl1}.${tb1} change a a bigint default 10;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} drop column b;"
    run_sql_source2 "alter table ${shardddl1}.${tb2} change a a bigint default 10;"

    check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

function DM_103() {
    run_case 103 "double-source-pessimistic" \
    "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
     run_sql_source2 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
     run_sql_source2 \"create table ${shardddl1}.${tb2} (a int, b varchar(10));\"" \
    "clean_table" "pessimistic"
    run_case 103 "double-source-optimistic" \
    "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
     run_sql_source2 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
     run_sql_source2 \"create table ${shardddl1}.${tb2} (a int, b varchar(10));\"" \
    "clean_table" "optimistic"
}

function run() {
    init_cluster
    init_database
    start=71
    end=103
    except=(071 072 073 074 075 078 079 083 084 085 086 087 088 089 090 091 092 093)
    for i in $(seq -f "%03g" ${start} ${end}); do
        if [[ ${except[@]} =~ $i ]]; then
            continue
        fi
        DM_${i}
        sleep 1
    done
}

cleanup_data $shardddl
cleanup_data $shardddl1
cleanup_data $shardddl2
# also cleanup dm processes in case of last run failed
cleanup_process $*
run $*
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
